-
Notifications
You must be signed in to change notification settings - Fork 152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Fix a leak in PrefixAndTail operator. #1623
base: main
Are you sure you want to change the base?
Conversation
713a8e9
to
62e60a5
Compare
emit(out, (builder.result(), Source.empty), () => completeStage()) | ||
val prefix = builder.result(); | ||
builder = null // free for GC | ||
emit(out, (prefix, Source.empty), () => completeStage()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will leak before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good - if @queimadus can try this change too that would be very useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually can't seem to reproduce the issue with the example I mentioned with flatMapPrefix
with prefixAndTail
.
E.g. running the following runs fine with and without these changes
val s = Source
.repeat(())
.map(_ => ByteString('a' * 4000000))
.take(1000000)
.prefixAndTail(50000)
.flatMapConcat { case (prefix, tail) => tail }
Source.empty
.concatAllLazy(List.tabulate(30000)(_ => s): _*)
.runWith(Sink.ignore).onComplete(println(_))
What doesn't run fine is the the following snippet, but it may be a different problem.
def myLogic(prefix: Seq[ByteString]): Flow[ByteString, ByteString, NotUsed] =
Flow[ByteString]
val s = Source
.repeat(())
.map(_ => ByteString('a' * 400000))
.take(1000000)
.prefixAndTail(50000)
.flatMapConcat { case (prefix, tail) =>
Source(prefix).concatLazy(tail).via(myLogic(prefix))
}
Source.empty
.concatAllLazy(List.tabulate(30000)(_ => s): _*)
.runWith(Sink.ignore).onComplete(println(_))
That said, these changes still seem to make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This problem is a kind of AsyncCallback leak, #408
where many instances consume memories.
To totally fix this, we will lose some support for debugging I think, eg the org.apache.pekko.stream.impl.fusing.GraphInterpreter#toSnapshot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@queimadus Yes, different problem, the new problem is the interpreter keeps many things in memory
62e60a5
to
42cca6b
Compare
Motivation:
Fix @queimadus's report in #1566
Modification:
When the upstream finishes quickly but the downstream is not pulled, a leak can happen, so clean up that.
Result:
I think the leak should be fixed now
code from @queimadus
before:
after:
Seems there are more leak points.
I think the problem is the current logic design where arrays are been used to track and then with the current deign, there will be many many instances of the logic.
So That problem is harder to fix than we expected, but this pr is still valid.